kube-proxy 运行在 Kubernetes 集群的计算节点上,负责 Service 的负载均衡及服务代理。
原理 kube-proxy 工作的核心的流程是从 kube-apiserver 同步 service 和 endpoint 的信息,然后将其更新到 iptables。从 1.11 版本开始,基于 IPVS 的负载均衡已经 GA 了,可以预见的是未来 k8s 会主推 IPVS 模式。
kube-proxy 目前有3种常见的 proxyMode,分别是 userspace,iptables,ipvs,还有一种是 Windows 平台的 kernelspace。其中 userspace mode 是 v1.0 及以前版本的默认模式。从 v1.1 版本开始,增加了 iptables mode,在 v1.3版本中正式替代了 userspace 模式成为默认模式(需要 iptables 的版本>= 1.4.11)。在实践中,社区发现 iptables 的扩展性和性能都不佳,毕竟 iptables 是为防火墙设计的,它的特性并不适合当做大规模服务的负载均衡,华为贡献了基于 IPVS 实现 kube-proxy 的特性。IPVS 是 LVS 的负载均衡模块,同样基于 netfilter,但比 iptables 性能更好,具备更好的可扩展性,鉴于 IPVS 是 kube-proxy 的未来趋势,在阅读 kube-proxy 源码的时候可以着重看这部分。
下面我们分别来看 kube-proxy 的三种模式的实现。对于 userspace,iptables 这两种模式,本文只做简述,对于 ipvs 模式,会结合其设计文档,代码进行分析。
userspace mode 基于用户态的 proxy,service 的请求会先从用户空间进入内核 iptables,然后再回到用户空间,由 kube-proxy 完成后端 endpoints 的选择和代理工作,这种方式流量从用户空间进出内核带来的性能损耗比较大。原理如下图:
示例 摘自 kubernetes入门之kube-proxy实现原理
现在有一个 service ssh-service1
1 2 3 4 $ kubectl get service NAME LABELS SELECTOR IP(S) PORT(S) kubernetes component=apiserver,provider=kubernetes <none> 10.254.0.1 443/TCP ssh-service1 name=ssh,role=service ssh-service=true 10.254.132.107 2222/TCP
这个 service 的 cluster ip 是 10.254.132.107。
1 2 3 4 5 6 7 8 9 10 11 12 $ kubectl describe service ssh-service1 Name: ssh-service1 Namespace: default Labels: name=ssh,role=service Selector: ssh-service=true Type: LoadBalancer IP: 10.254.132.107 Port: <unnamed> 2222/TCP NodePort: <unnamed> 30239/TCP Endpoints: <none> Session Affinity: None No events.
这时候 iptables 的规则是这样的:
1 2 3 4 5 6 $ sudo iptables -S -t nat ... -A KUBE-NODEPORT-CONTAINER -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j REDIRECT --to-ports 36463 -A KUBE-NODEPORT-HOST -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j DNAT --to-destination 10.0.0.5:36463 -A KUBE-PORTALS-CONTAINER -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j REDIRECT --to-ports 36463 -A KUBE-PORTALS-HOST -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j DNAT --to-destination 10.0.0.5:36463
这个 node 的 ip 是 10.0.0.5,那么访问 10.0.0.5:30239 就会被转发到本机的 36463 端口,在访问 10.254.132.107:2222 时,也会转发到 36463 端口,36463 端口是由 kube-proxy 监听的,流量接着会被导到后端的 pod 上。
iptables mode iptables 的方式是完全通过内核的 iptables 实现 service 的代理和 LB,这是 v1.2 及以后版本的默认模式,原理如下图:
这种方式通过 iptable NAT 完成转发,也有一定的性能损耗;此外,iptables 没有增量更新的功能,如果更新一条规则需要整体刷新,时间长,而且对服务的稳定性也有影响;iptable 是串行的,一个 node 上如果有很多的 iptables 规则,流量需要经过所有的匹配再进行转发,在服务规模比较大的情况下对时间、CPU、内存都有比较大的消耗。这导致的一个结果是,大型企业将 k8s 用于生产时,不会直接 kube-proxy 作为服务代理,而是使用 NodePort 或使用 externalIP(比如这篇文章中的例子 ),或自己开发,配置负载均衡代替 kube-proxy。
示例 摘自 kubernetes入门之kube-proxy实现原理
创建 mysql-service 的 service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 apiVersion: v1 kind: Service metadata: labels: name: mysql role: service name: mysql-service spec: ports: - port: 3306 targetPort: 3306 nodePort: 30964 type: NodePort selector: mysql-service: "true"
这个服务的 cluster ip 是 10.254.162.44,代理的两个 pod 的 ip 是 192.168.125.129 和192.168.125.131,再看 iptables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 $ iptables -S -t nat ... -A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES -A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES -A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING -A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000 -A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ -A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM -A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ -A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306 -A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ -A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306 -A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment --comment "default/mysql-service: cluster IP" -m tcp --dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS -A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P -A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T
如果通过 node 的 30964 端口访问,匹配的是下面两条链:
1 2 -A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ -A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM
如果直接访问 cluster ip (10.254.162.44),匹配的是下面的规则:
1 -A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment --comment "default/mysql-service: cluster IP" -m tcp --dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM
上述的两种匹配都会跳转到 KUBE-SVC-67RL4FN6JRUPOJYM 的链。
1 2 -A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P -A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T
50% 的概率匹配 KUBE-SEP-ID6YWIT3F6WNZ47P,50% 的概率匹配 KUBE-SEP-IN2YML2VIFH5RO2T。
KUBE-SEP-ID6YWIT3F6WNZ47P 的作用是通过 DNAT 发送到192.168.125.129的3306端口,KUBE-SEP-IN2YML2VIFH5RO2T 同理,发送的是192.168.125.131的3306端口。
1 2 3 4 -A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ -A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306 -A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ -A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306
IPVS 如果是使用 Kubernetes 1.8 或更高版本,可以使用 ipvs 模式,它是对 iptables 的替换。ipvs 模式下增加规则是增量式的,不会强制全量刷新,匹配服务时也不会进行串行匹配,而是通过一定规则进行哈希匹配,以找到相应的规则。
相比于 iptables,它具备更高的性能和稳定性。下图是华为云提供的压测数据:
IPVS 是 LVS 的一个核心软件模块,所以我们先介绍 LVS。LVS 是 Linux VIrtual Server 的缩写,这是由章文嵩博士 发起的项目,目前已经合并到了 Linux 内核中。
IPVS 是 LVS 的 IP 负载均衡模块,安装在 LVS 集群作为负载均衡的主节点。
LVS 集群中有 Director 和 Real Server 两个角色,有三种类型的 IP 地址:
Director Virtual IP,调度器用于与客户端通信的 IP 地址,简称为 VIP
Director IP,调度器用于与 Real Server 通信的 IP 地址,简称为 DIP
Real IP,后端主机与调度器通信的 IP 地址,简称为 RIP
LVS 有三种调度模式:
LVS-TUN IPTuneling
LVS-DR Direct Routing
目前 kube-proxy 的实现选择的是 NAT 模式。
LVS 有十种调度算法:
静态方法,根据算法本身进行轮询调度
RR, Round Robin
WRR,Wrighted RR
SH,SourceIP Hash
DH,Destination Hash
动态方法,根据算法以及 RS 的当前负载状态进行调度
LC,least connections
WLC,Weighted Least Connection
SED,Shortest Expection Delay
NQ,Never Queue
LBLC,Locality-Based Least Connection
LBLCR,Locality-Based Least Connections withReplication
kube-proxy 可以通过 --ipvs-scheduler
参数选择调度算法,默认情况下是 Round Robin 算法。
关于 LVS 的详细介绍,可以查阅 Linux服务器集群系统(一) 。
创建一个 service 后,k8s 会在每个节点上创建一个网卡,同时绑定在 Service IP(VIP)上,这时内核会认为 VIP 就是本机 IP,通过 socket 调用,创建 IPVS 的 virtual server 和 real server,分别对应 k8s 的 Service 和 Endpoints。socket 的调用由 docker 公司的 libnetwork 库完成。
代码解析 了解了 kube-proxy 的原理,再阅读代码就很容易理解了。代码版本是 v1.12.2-beta.0。
Run() Run
方法在 cmd/kube-proxy/proxy.go
中。
1 2 3 4 5 6 7 8 9 10 11 12 func (o *Options) Run () error { if len (o.WriteConfigTo) > 0 { return o.writeConfigFile() } proxyServer, err := NewProxyServer(o) if err != nil { return err } return proxyServer.Run() }
逻辑很简单。首先通过 NewProxyServer
构造一个 ProxyServer
,然后调用它的 Run
方法来运行。
初始化 ProxyServer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 func newProxyServer ( config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExit bool , cleanupIPVS bool , scheme *runtime.Scheme, master string ) (*ProxyServer, error) { if config == nil { return nil , errors.New("config is required" ) } if c, err := configz.New(proxyconfigapi.GroupName); err == nil { c.Set(config) } else { return nil , fmt.Errorf("unable to register configz: %s" , err) } protocol := utiliptables.ProtocolIpv4 if net.ParseIP(config.BindAddress).To4() == nil { glog.V(0 ).Infof("IPv6 bind address (%s), assume IPv6 operation" , config.BindAddress) protocol = utiliptables.ProtocolIpv6 } var iptInterface utiliptables.Interface var ipvsInterface utilipvs.Interface var kernelHandler ipvs.KernelHandler var ipsetInterface utilipset.Interface var dbus utildbus.Interface execer := exec.New() dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) kernelHandler = ipvs.NewLinuxKernelHandler() ipsetInterface = utilipset.New(execer) canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface) if canUseIPVS { ipvsInterface = utilipvs.New(execer) } if cleanupAndExit { return &ProxyServer{ execer: execer, IptInterface: iptInterface, IpvsInterface: ipvsInterface, IpsetInterface: ipsetInterface, CleanupAndExit: cleanupAndExit, }, nil } client, eventClient, err := createClients(config.ClientConnection, master) if err != nil { return nil , err } hostname, err := utilnode.GetHostname(config.HostnameOverride) if err != nil { return nil , err } eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy" , Host: hostname}) nodeRef := &v1.ObjectReference{ Kind: "Node" , Name: hostname, UID: types.UID(hostname), Namespace: "" , } var healthzServer *healthcheck.HealthzServer var healthzUpdater healthcheck.HealthzUpdater if len (config.HealthzBindAddress) > 0 { healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2 *config.IPTables.SyncPeriod.Duration, recorder, nodeRef) healthzUpdater = healthzServer } var proxier proxy.ProxyProvider var serviceEventHandler proxyconfig.ServiceHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string (config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) if nodeIP.IsUnspecified() { nodeIP = getNodeIP(client, hostname) } if proxyMode == proxyModeIPTables { } else if proxyMode == proxyModeIPVS { glog.V(0 ).Info("Using ipvs Proxier." ) proxierIPVS, err := ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, config.IPVS.MinSyncPeriod.Duration, config.IPVS.ExcludeCIDRs, config.IPTables.MasqueradeAll, int (*config.IPTables.MasqueradeBit), config.ClusterCIDR, hostname, nodeIP, recorder, healthzServer, config.IPVS.Scheduler, config.NodePortAddresses, ) if err != nil { return nil , fmt.Errorf("unable to create proxier: %v" , err) } metrics.RegisterMetrics() proxier = proxierIPVS serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS glog.V(0 ).Info("Tearing down inactive rules." ) userspace.CleanupLeftovers(iptInterface) iptables.CleanupLeftovers(iptInterface) } else { glog.V(0 ).Info("Using userspace Proxier." ) } iptInterface.AddReloadFunc(proxier.Sync) return &ProxyServer{ Client: client, EventClient: eventClient, IptInterface: iptInterface, IpvsInterface: ipvsInterface, IpsetInterface: ipsetInterface, execer: execer, Proxier: proxier, Broadcaster: eventBroadcaster, Recorder: recorder, ConntrackConfiguration: config.Conntrack, Conntracker: &realConntracker{}, ProxyMode: proxyMode, NodeRef: nodeRef, MetricsBindAddress: config.MetricsBindAddress, EnableProfiling: config.EnableProfiling, OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ServiceEventHandler: serviceEventHandler, EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil }
以上代码的基本流程是:
创建 iptables 的接口 iptInterface
以及 ipvs 的接口 ipvsInterface
根据上面的字段初始化 ProxyServer
创建健康检查服务
初始化 serviceEventHandler
和 endpointsEventHandler
,用于定义 service 和 endpoints 的发生变化后的处理方法。
根据 proxyMode 进入不同的分支
如果是使用 ipvs 模式,调用 ./pkg/proxy/ipvs/proxier.go
的 NewProxier
进行初始化。iptable, userspace 模式同理
需要注意的是调用 NewProxier
过程中,有 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
这么一行,用来初始化 syncRunner
,这里很重要,在下面我们会看到。
运行 ProxyServer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func (s *ProxyServer) Run () error { if s.CleanupAndExit { encounteredError := userspace.CleanupLeftovers(s.IptInterface) encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface, s.CleanupIPVS) || encounteredError if encounteredError { return errors.New("encountered an error while tearing down rules." ) } return nil } var oomAdjuster *oom.OOMAdjuster if s.OOMScoreAdj != nil { oomAdjuster = oom.NewOOMAdjuster() if err := oomAdjuster.ApplyOOMScoreAdj(0 , int (*s.OOMScoreAdj)); err != nil { glog.V(2 ).Info(err) } } if len (s.ResourceContainer) != 0 { if err := resourcecontainer.RunInResourceContainer(s.ResourceContainer); err != nil { glog.Warningf("Failed to start in resource-only container %q: %v" , s.ResourceContainer, err) } else { glog.V(2 ).Infof("Running in resource-only container %q" , s.ResourceContainer) } } if s.Broadcaster != nil && s.EventClient != nil { s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("" )}) } informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod) serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) serviceConfig.RegisterEventHandler(s.ServiceEventHandler) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) go informerFactory.Start(wait.NeverStop) s.birthCry() s.Proxier.SyncLoop() return nil }
如果运行时带了 CleanupAndExit
参数,清理所有的 iptables,ipvs 的规则然后退出。OOM adjuster 和 ResourceContainer 目前都没有完整实现,先跳过,接着启动健康检查服务,metrics 服务器,用于监控,最后启动 informer,注册 service,endpoint 的监听事件,这两个 handler 都是之前注册的 proxierIPVS
,当监听到 service,endpoint 的变化,就会触发响应的方法,注意到 ProxyServer
中定义的 ServiceEventHandler
和 EndpointsEventHandler
分别是 ServiceHandler
,EndpointsHandler
类型,实际上定义的是一个接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 type ServiceHandler interface { OnServiceAdd(service *v1.Service) OnServiceUpdate(oldService, service *v1.Service) OnServiceDelete(service *v1.Service) OnServiceSynced() } type EndpointsHandler interface { OnEndpointsAdd(endpoints *v1.Endpoints) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) OnEndpointsDelete(endpoints *v1.Endpoints) OnEndpointsSynced() }
在 pkg/proxy/ipvs/proxier.go
的 Proxier
中实现了这些方法,可以一一对照查看。Run
方法最后会调用 s.Proxier.SyncLoop
,也属于 Proxier 的实现,这些都在 pkg/proxy/ipvs
模块里。
ipvs 以监听到增加 service 的事件为例,我们看看 Proxier 会做什么。
1 2 3 4 5 6 7 8 9 func (proxier *Proxier) OnServiceAdd (service *v1.Service) { proxier.OnServiceUpdate(nil , service) } func (proxier *Proxier) OnServiceUpdate (oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }
Update 方法在 pkg/proxy/service.go
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func (sct *ServiceChangeTracker) Update (previous, current *v1.Service) bool { svc := current if svc == nil { svc = previous } if svc == nil { return false } namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} sct.lock.Lock() defer sct.lock.Unlock() change, exists := sct.items[namespacedName] if !exists { change = &serviceChange{} change.previous = sct.serviceToServiceMap(previous) sct.items[namespacedName] = change } change.current = sct.serviceToServiceMap(current) if reflect.DeepEqual(change.previous, change.current) { delete (sct.items, namespacedName) } return len (sct.items) > 0 }
实际上只是对比 Service 的变化,然后将其存在一个 Map 数据结构中,最后 OnServiceUpdate
会调用 proxier.syncRunner.Run()
。
1 2 3 4 5 6 func (bfr *BoundedFrequencyRunner) Run () { select { case bfr.run <- struct {}{}: default : } }
Run
方法会发送一个信号到 BoundedFrequencyRunner
的 run 这个 channel。还记得 Run
方法的最后,我们调用了 SyncLoop
方法:
1 2 3 4 5 6 func (proxier *Proxier) SyncLoop () { if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } proxier.syncRunner.Loop(wait.NeverStop) }
Loop
方法在 pkg/util/async/bounded_frequency_runner.go
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (bfr *BoundedFrequencyRunner) Loop (stop <-chan struct {}) { glog.V(3 ).Infof("%s Loop running" , bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() glog.V(3 ).Infof("%s Loop stopping" , bfr.name) return case <-bfr.timer.C(): bfr.tryRun() case <-bfr.run: bfr.tryRun() } } }
它会不停的运行,定期地执行 tryRun
方法,当接收到 run 事件时,也会调用 tryRun
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (bfr *BoundedFrequencyRunner) tryRun () { bfr.mu.Lock() defer bfr.mu.Unlock() if bfr.limiter.TryAccept() { bfr.fn() bfr.lastRun = bfr.timer.Now() bfr.timer.Stop() bfr.timer.Reset(bfr.maxInterval) glog.V(3 ).Infof("%s: ran, next possible in %v, periodic in %v" , bfr.name, bfr.minInterval, bfr.maxInterval) return } }
这个 tryRun
方法会调用 bfr.fn()
,这个回调函数其实就是在 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
注册的,也就是说所有的这些定期执行,通过事件触发执行的方法,最终会调用 syncProxyRules
。
这个方法特别长,借助设计文档里的伪代码可以帮助理解(有一些增改):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (proxier *Proxier) syncProxyRules () { for svcName, svcInfo := range proxier.serviceMap { for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { if ingress.IP != "" { if len (svcInfo.LoadBalancerSourceRanges) != 0 { install specific iptables } } } if svcInfo.NodePort != 0 { fall back on iptables, recruit existing iptables proxier implementation } } }
以上巨长的 syncProxyRules
方法就是 kube-proxy 创建负载均衡的主要逻辑,其中 service 和 endpoint 的结果可以从之前保存的 Map 中拿到。
References